package com.atguigu.flink.state;

import com.atguigu.flink.pojo.UserBehavior;
import com.atguigu.flink.utils.MyUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
 * Created by Smexy on 2023/11/17
 *
 用户对某个商品的一次操作是一行
 userId,itemId,categoryId,behavior,ts(秒)
 543462,1715,1464116,pv,1511658000
 ----------------------------------------
 每间隔1h统计过去1h的Top3热门(点击量高)商品。
 ----------------------------

 输入:
 用户对某个商品的一次操作是一行

 第一次聚合： keyBy(itemId)
 聚合： 事件时间滚动keyed窗口
 size: 1h
 silde : 1h

统计在当前窗口中，每一个商品被pv的次数
 [8:00,9:00)  : a-->200
 [8:00,9:00)  : b-->150
 [8:00,9:00)  : c--->100

 第二次聚合: keyBy(窗口范围 start|end )
  使用list集合收集到达的每一条数据
    List[    [8:00,9:00)  : a-->200,    [8:00,9:00)  : b-->150 ...  ]
    指定窗口end时间的定时器，之后执行top3聚合


 输出:
 [8:00,9:00) top3: a-->200,b-->150,c--->100

 */
public class Demo3_TopNKeyedWindow
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

        FileSource<String> source = FileSource.forRecordStreamFormat(
                                                  new TextLineInputFormat(),
                                                  new Path("data/UserBehavior.csv"))
                                              .build();

         WatermarkStrategy<UserBehavior> watermarkStrategy = WatermarkStrategy
                     .<UserBehavior>forMonotonousTimestamps()
                     .withTimestampAssigner( (e, ts) -> e.getTs());

                env
                   .fromSource(source,WatermarkStrategy.noWatermarks(),"data")
                   .map(new MapFunction<String, UserBehavior>()
                   {
                       @Override
                       public UserBehavior map(String value) throws Exception {
                           String[] words = value.split(",");
                           return new UserBehavior(
                               words[0],
                               words[1],
                               words[2],
                               words[3],
                               Long.valueOf(words[4]) * 1000
                           );
                       }
                   })
                   //热门根据pv的次数，过滤pv的数据
                    .filter(u -> "pv".equals(u.getBehavior()))
                    .assignTimestampsAndWatermarks(watermarkStrategy)
                   //第一次分组聚合
                   .keyBy(UserBehavior::getItemId)
                    .window(TumblingEventTimeWindows.of(Time.hours(1)))
                    .aggregate(new AggregateFunction<UserBehavior, Integer, HotItem>()
                               {
                                   @Override
                                   public Integer createAccumulator() {
                                       return 0;
                                   }

                                   @Override
                                   public Integer add(UserBehavior value, Integer accumulator) {
                                       return accumulator + 1;
                                   }

                                   @Override
                                   public HotItem getResult(Integer accumulator) {
                                       return new HotItem(null, null, null, accumulator);
                                   }

                                   @Override
                                   public Integer merge(Integer a, Integer b) {
                                       return null;
                                   }
                               },
                        new WindowFunction<HotItem, HotItem, String, TimeWindow>()
                        {
                            @Override
                            public void apply(String key, TimeWindow window, Iterable<HotItem> input, Collector<HotItem> out) throws Exception {
                                HotItem hotItem = input.iterator().next();
                                //补齐其他属性
                                hotItem.setStart(window.getStart());
                                hotItem.setEnd(window.getEnd());
                                hotItem.setItemId(key);
                                out.collect(hotItem);
                            }
                        })
                   //第二次分组
                    .keyBy(HotItem::getEnd)
                    .process(new KeyedProcessFunction<Long, HotItem, String>()
                    {

                        private ValueState<Boolean> ifNeedTimer;
                        //收集某个窗口范围中商品的点击次数
                        private ListState<HotItem> items;

                        @Override
                        public void open(Configuration parameters) throws Exception {
                            items = getRuntimeContext().getListState(new ListStateDescriptor<>("items", HotItem.class));
                            ifNeedTimer = getRuntimeContext().getState(new ValueStateDescriptor<>("ifNeedTimer", Boolean.class));
                        }

                        //来一条数据，调用一次。并不是开窗
                        @Override
                        public void processElement(HotItem value, KeyedProcessFunction<Long, HotItem, String>.Context ctx, Collector<String> out) throws Exception {
                            items.add(value);
                            TimerService timerService = ctx.timerService();
                            //当前窗口范围的第一个数据到达时，指定定时器
                            if (ifNeedTimer.value() == null){
                                //在窗口的结束时间，触发定时器
                                timerService.registerEventTimeTimer(value.getEnd() + 5000);
                                ifNeedTimer.update(true);
                            }
                        }

                        //触发定时器，完成top3排序
                        @Override
                        public void onTimer(long timestamp,OnTimerContext ctx, Collector<String> out) throws Exception {

                            List<HotItem> top3Items = StreamSupport.stream(items.get().spliterator(), true)
                                                                 .sorted((i1, i2) -> -i1.getPv().compareTo(i2.getPv()))
                                                                 .limit(3)
                                                                 .collect(Collectors.toList());

                            //获取时间
                            HotItem hotItem = top3Items.get(0);
                            String timeStr = MyUtil.parseTimeWindow(new TimeWindow(hotItem.getStart(), hotItem.getEnd()));

                            String top3Str = top3Items.stream().map(i -> i.getItemId() + "-->" + i.getPv())
                                                      .collect(Collectors.joining(","));

                            out.collect(timeStr +" top3:" +top3Str);


                        }
                    })
                    .print();


                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

    }

    // [8:00,9:00)  : a-->200
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    private static class HotItem {
        private Long start;
        private Long end;
        private String itemId;
        private Integer pv;
    }
}
